Hanging Chain#

In this example, a mode predictive controller (MPC) is used to stabilize a system of weights connected by springs. The rightmost weight is fixed in place at the origin, whereas the velocity of the leftmost weight can be controlled by an actuator. The six weights in the middle move under the influence of gravity and the forces of the springs between them.

The goal of the controller is to stabilize the system (i.e. drive the velocity of all weights to zero) with the rightmost weight at position \((1, 0)\). Additionally, a non-convex cubic constraint on the weights’ position is imposed, shown in green on the figure below.

  1## @example mpc/python/hanging-chain/hanging-chain-mpc.py
  2# This example shows how to call the alpaqa solver from Python code, applied
  3# to a challenging model predictive control problem.
  4
  5# %% Hanging chain MPC example
  6
  7import casadi as cs
  8import numpy as np
  9import os
 10from os.path import join, dirname
 11import sys
 12
 13sys.path.append(dirname(__file__))
 14from hanging_chain_dynamics import HangingChain
 15
 16# %% Build the model
 17
 18Ts = 0.05
 19N = 6  # number of balls
 20dim = 2  # dimension
 21
 22model = HangingChain(N, dim)
 23f_d = model.dynamics(Ts)
 24y_null, u_null = model.initial_state()
 25
 26param = [0.03, 1.6, 0.033 / N]  # Concrete parameters m, D, L
 27
 28# %% Apply an initial input to disturb the system
 29
 30N_dist = 3
 31u_dist = [-0.5, 0.5, 0.5] if dim == 3 else [-0.5, 0.5]
 32y_dist = model.simulate(N_dist, y_null, u_dist, param)
 33y_dist = np.hstack((np.array([y_null]).T, y_dist))
 34
 35# %% Simulate the system without a controller
 36
 37N_sim = 180
 38y_sim = model.simulate(N_sim, y_dist[:, -1], u_null, param)
 39y_sim = np.hstack((y_dist, y_sim))
 40
 41# %% Define MPC cost and constraints
 42
 43N_horiz = 12
 44
 45L_cost = model.generate_cost_fun()  # stage cost
 46y_init = cs.SX.sym("y_init", *y_null.shape)  # initial state
 47U = cs.SX.sym("U", dim * N_horiz)  # control signals over horizon
 48constr_param = cs.SX.sym("c", 3)  # Coefficients of cubic constraint function
 49mpc_param = cs.vertcat(y_init, model.params, constr_param)  # all parameters
 50U_mat = model.input_to_matrix(U)  # Input as dim by N_horiz matrix
 51
 52# Cost
 53mpc_sim = model.simulate(N_horiz, y_init, U_mat, model.params)
 54mpc_cost = 0
 55for n in range(N_horiz):  # Apply the stage cost function to each stage
 56    y_n = mpc_sim[:, n]
 57    u_n = U_mat[:, n]
 58    mpc_cost += L_cost(y_n, u_n)
 59mpc_cost_fun = cs.Function('f_mpc', [U, mpc_param], [mpc_cost])
 60
 61# Constraints
 62g_constr = lambda c, x: c[0] * x**3 + c[1] * x**2 + c[2] * x  # Cubic constr
 63constr = []
 64for n in range(N_horiz):  # For each stage,
 65    y_n = mpc_sim[:, n]
 66    for i in range(N):  # for each ball in the stage,
 67        yx_n = y_n[dim * i]  # constrain the x, y position of the ball
 68        yy_n = y_n[dim * i + dim - 1]
 69        constr += [yy_n - g_constr(constr_param, yx_n)]
 70    constr += [y_n[-1] - g_constr(constr_param, y_n[-dim])]  # Ball N+1
 71mpc_constr_fun = cs.Function("g", [U, mpc_param], [cs.vertcat(*constr)])
 72
 73# Fill in the constraint coefficients c(x-a)³ + d(x - a) + b
 74a, b, c, d = 0.6, -1.4, 5, 2.2
 75constr_coeff = [c, -3 * a * c, 3 * a * a * c + d]
 76constr_lb = b - c * a**3 - d * a
 77
 78# %% NLP formulation
 79import alpaqa as pa
 80
 81prob = pa.generate_and_compile_casadi_problem(mpc_cost_fun, mpc_constr_fun)
 82prob.C.lowerbound = -1 * np.ones((dim * N_horiz, ))
 83prob.C.upperbound = +1 * np.ones((dim * N_horiz, ))
 84prob.D.lowerbound = constr_lb * np.ones((len(constr), ))
 85
 86# %% NLP solver
 87from datetime import timedelta
 88
 89solver = pa.ALMSolver(
 90    alm_params={
 91        'ε': 1e-4,
 92        'δ': 1e-4,
 93        'Σ_0': 1e5,
 94        'max_time': timedelta(seconds=0.5),
 95    },
 96    inner_solver=pa.StructuredPANOCLBFGSSolver(
 97        panoc_params={
 98            'stop_crit': pa.ProjGradNorm2,
 99            'max_time': timedelta(seconds=0.2),
100            'hessian_step_size_heuristic': 15,
101        },
102        lbfgs_params={'memory': N_horiz},
103    ),
104)
105
106# %% MPC controller
107
108
109class MPCController:
110    tot_time = timedelta()
111    tot_it = 0
112    failures = 0
113    U = np.zeros((N_horiz * dim, ))
114    λ = np.zeros(((N + 1) * N_horiz, ))
115
116    def __init__(self, model, problem):
117        self.model = model
118        self.problem = problem
119
120    def __call__(self, y_n):
121        y_n = np.array(y_n).ravel()
122        # Set the current state as the initial state
123        self.problem.param[:y_n.shape[0]] = y_n
124        # Solve the optimal control problem
125        # (warm start using the previous solution and Lagrange multipliers)
126        self.U, self.λ, stats = solver(self.problem, self.U, self.λ)
127        # Print some solver statistics
128        print(stats['status'], stats['outer_iterations'],
129              stats['inner']['iterations'], stats['elapsed_time'],
130              stats['inner_convergence_failures'])
131        self.tot_time += stats['elapsed_time']
132        self.tot_it += stats['inner']['iterations']
133        self.failures += stats['status'] != pa.SolverStatus.Converged
134        # Print the Lagrange multipliers, shows that constraints are active
135        print(np.linalg.norm(self.λ))
136        # Return the optimal control signal for the first time step
137        return self.model.input_to_matrix(self.U)[:, 0]
138
139
140# %% Simulate the system using the MPC controller
141
142y_n = np.array(y_dist[:, -1]).ravel()  # initial state for controller
143n_state = y_n.shape[0]
144prob.param = np.concatenate((y_n, param, constr_coeff))
145
146y_mpc = np.empty((n_state, N_sim))
147controller = MPCController(model, prob)
148for n in range(N_sim):
149    # Solve the optimal control problem
150    u_n = controller(y_n)
151    # Apply the first optimal control signal to the system and simulate for
152    # one time step, then update the state
153    y_n = model.simulate(1, y_n, u_n, param).T
154    y_mpc[:, n] = y_n
155y_mpc = np.hstack((y_dist, y_mpc))
156
157print(controller.tot_it, controller.failures, controller.tot_time)
158print(prob.evaluations)
159print(f'{"g":13}:{prob.evaluations.g:6}: {prob.evaluations.time.g/prob.evaluations.g}')
160print(f'{"grad_L":13}:{prob.evaluations.grad_L:6}: {prob.evaluations.time.grad_L/prob.evaluations.grad_L}')
161print(f'{"ψ":13}:{prob.evaluations.ψ:6}: {prob.evaluations.time.ψ/prob.evaluations.ψ}')
162print(f'{"grad_ψ":13}:{prob.evaluations.grad_ψ:6}: {prob.evaluations.time.grad_ψ/prob.evaluations.grad_ψ}')
163print(f'{"grad_ψ_from_ŷ":13}:{prob.evaluations.grad_ψ_from_ŷ:6}: {prob.evaluations.time.grad_ψ_from_ŷ/prob.evaluations.grad_ψ_from_ŷ}')
164print(f'{"ψ_grad_ψ":13}:{prob.evaluations.ψ_grad_ψ:6}: {prob.evaluations.time.ψ_grad_ψ/prob.evaluations.ψ_grad_ψ}')
165
166# %% Visualize
167
168import matplotlib.pyplot as plt
169import matplotlib as mpl
170from matplotlib import patheffects
171from matplotlib.animation import FuncAnimation
172
173mpl.rcParams['animation.frame_format'] = 'svg'
174
175fig, ax = plt.subplots()
176x, y, z = model.state_to_pos(y_null)
177line, = ax.plot(x, y, '-o', label='Without MPC')
178line_ctrl, = ax.plot(x, y, '-o', label='With MPC')
179plt.legend()
180plt.ylim([-2.5, 1])
181plt.xlim([-0.25, 1.25])
182
183x = np.linspace(-0.25, 1.25, 256)
184y = np.linspace(-2.5, 1, 256)
185X, Y = np.meshgrid(x, y)
186Z = g_constr(constr_coeff, X) + constr_lb - Y
187
188fx = [patheffects.withTickedStroke(spacing=7, linewidth=0.8)]
189cgc = plt.contour(X, Y, Z, [0], colors='tab:green', linewidths=0.8)
190plt.setp(cgc.collections, path_effects=fx)
191
192
193class Animation:
194    points = []
195
196    def __call__(self, i):
197        x, y, z = model.state_to_pos(y_sim[:, i])
198        for p in self.points:
199            p.remove()
200        self.points = []
201        line.set_xdata(x)
202        line.set_ydata(y)
203        viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
204        if np.sum(viol):
205            self.points += ax.plot(x[viol], y[viol], 'rx', markersize=12)
206        x, y, z = model.state_to_pos(y_mpc[:, i])
207        line_ctrl.set_xdata(x)
208        line_ctrl.set_ydata(y)
209        viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
210        if np.sum(viol):
211            self.points += ax.plot(x[viol], y[viol], 'rx', markersize=12)
212        return [line, line_ctrl] + self.points
213
214
215ani = FuncAnimation(fig,
216                    Animation(),
217                    interval=1000 * Ts,
218                    blit=True,
219                    repeat=True,
220                    frames=1 + N_dist + N_sim)
221
222# Export the animation
223out = join(dirname(__file__), '..', '..', '..', '..', 'sphinx', 'source',
224           'sphinxstatic', 'hanging-chain.html')
225os.makedirs(dirname(out), exist_ok=True)
226with open(out, "w") as f:
227    f.write('<center>')
228    f.write(ani.to_jshtml())
229    f.write('</center>')
230
231# Show the animation
232plt.show()
  1from typing import Union
  2import casadi as cs
  3import numpy as np
  4from casadi import vertcat as vc
  5
  6
  7class HangingChain:
  8    def __init__(self, N: int, dim: int):
  9        self.N = N
 10        self.dim = dim
 11
 12        self.y1 = cs.SX.sym("y1", dim * N, 1)  # state: balls 1→N positions
 13        self.y2 = cs.SX.sym("y2", dim * N, 1)  # state: balls 1→N velocities
 14        self.y3 = cs.SX.sym("y3", dim, 1)  # state: ball  1+N position
 15        self.u = cs.SX.sym("u", dim, 1)  # input: ball  1+N velocity
 16        self.y = vc(self.y1, self.y2, self.y3)  # full state vector
 17
 18        self.m = cs.SX.sym("m")  # mass
 19        self.D = cs.SX.sym("D")  # spring constant
 20        self.L = cs.SX.sym("L")  # spring length
 21
 22        self.params = vc(self.m, self.D, self.L)
 23
 24        self.g = np.array([0, 0, -9.81] if dim == 3 else [0, -9.81])  # gravity
 25        self.x0 = np.zeros((dim, ))  # ball 0 position
 26        self.x_end = np.eye(1, dim, 0).ravel()  # ball N+1 reference position
 27
 28    def dynamics(self, Ts=0.05):
 29        y, y1, y2, y3, u = self.y, self.y1, self.y2, self.y3, self.u
 30        dist = lambda xa, xb: cs.norm_2(xa - xb)
 31        N, d = self.N, self.dim
 32        p = self.params
 33
 34        # Continuous-time dynamics y' = f(y, u; p)
 35
 36        f1 = [y2]
 37        f2 = []
 38        for i in range(N):
 39            xi = y1[d * i:d * i + d]
 40            xip1 = y1[d * i + d:d * i + d * 2] if i < N - 1 else y3
 41            Fiip1 = self.D * (1 - self.L / dist(xip1, xi)) * (xip1 - xi)
 42            xim1 = y1[d * i - d:d * i] if i > 0 else self.x0
 43            Fim1i = self.D * (1 - self.L / dist(xi, xim1)) * (xi - xim1)
 44            fi = (Fiip1 - Fim1i) / self.m + self.g
 45            f2 += [fi]
 46        f3 = [u]
 47
 48        f_expr = vc(*f1, *f2, *f3)
 49        self.f = cs.Function("f", [y, u, p], [f_expr], ["y", "u", "p"], ["y'"])
 50
 51        # Discretize dynamics y[k+1] = f_d(y[k], u[k]; p)
 52
 53        opt = {"tf": Ts, "simplify": True, "number_of_finite_elements": 4}
 54        intg = cs.integrator("intg", "rk", {
 55            "x": y,
 56            "p": vc(u, p),
 57            "ode": f_expr
 58        }, opt)
 59
 60        f_d_expr = intg(x0=y, p=vc(u, p))["xf"]
 61        self.f_d = cs.Function("f_d", [y, u, p], [f_d_expr], ["y", "u", "p"],
 62                               ["y+"])
 63
 64        return self.f_d
 65
 66    def state_to_pos(self, y):
 67        N, d = self.N, self.dim
 68        rav = lambda x: np.array(x).ravel()
 69        xdim = lambda y, i: np.concatenate(
 70            ([0], rav(y[i:d * N:d]), rav(y[-d + i])))
 71        if d == 3:
 72            return (xdim(y, 0), xdim(y, 1), xdim(y, 2))
 73        else:
 74            return (xdim(y, 0), xdim(y, 1), np.zeros((N + 1, )))
 75
 76    def input_to_matrix(self, u):
 77        """
 78        Reshape the input signal from a vector into a dim × N_horiz matrix (note
 79        that CasADi matrices are stored column-wise and NumPy arrays row-wise)
 80        """
 81        if isinstance(u, np.ndarray):
 82            return u.reshape((self.dim, u.shape[0] // self.dim), order='F')
 83        else:
 84            return u.reshape((self.dim, u.shape[0] // self.dim))
 85
 86    def simulate(self, N_sim: int, y_0: np.ndarray, u: Union[np.ndarray, list,
 87                                                             cs.SX.sym],
 88                 p: Union[np.ndarray, list, cs.SX.sym]):
 89        if isinstance(u, list):
 90            u = np.array(u)
 91        if isinstance(u, np.ndarray):
 92            if u.ndim == 1 or (u.ndim == 2 and u.shape[1] == 1):
 93                if u.shape[0] == self.dim:
 94                    u = np.tile(u, (N_sim, 1)).T
 95        return self.f_d.mapaccum(N_sim)(y_0, u, p)
 96
 97    def initial_state(self):
 98        N, d = self.N, self.dim
 99        y1_0 = np.zeros((d * N))
100        y1_0[0::d] = np.arange(1, N + 1) / (N + 1)
101        y2_0 = np.zeros((d * N))
102        y3_0 = np.zeros((d, ))
103        y3_0[0] = 1
104
105        y_null = np.concatenate((y1_0, y2_0, y3_0))
106        u_null = np.zeros((d, ))
107
108        return y_null, u_null
109
110    def generate_cost_fun(self, α=25, β=1, γ=0.01):
111        N, d = self.N, self.dim
112        y1t = cs.SX.sym("y1t", d * N, 1)
113        y2t = cs.SX.sym("y2t", d * N, 1)
114        y3t = cs.SX.sym("y3t", d, 1)
115        ut = cs.SX.sym("ut", d, 1)
116        yt = cs.vertcat(y1t, y2t, y3t)
117
118        L_cost = α * cs.sumsqr(y3t - self.x_end) + γ * cs.sumsqr(ut)
119        for i in range(N):
120            xdi = y2t[d * i:d * i + d]
121            L_cost += β * cs.sumsqr(xdi)
122        return cs.Function("L_cost", [yt, ut], [L_cost])